Fanount 基础 (没有路由Key))

image-20190810185631548

image-20190810185747360


SpringBoot+RabbitMQ的简单实现之Fanout模式

1.在pom中添加springboot对amqp的支持

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.在application.properties中添加RabbitMQ的简单配置信息

1
2
3
4
5
spring.rabbitmq.host=127.0.0.1
#5672是发送消息端口,15672是管理端的端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3.配置Queue(消息队列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Configuration
public class QueueConfig {
// 队列1 A
@Bean(name = "queue-fanoutA")
public Queue queue_fanoutA() {
return new Queue("queue-fanoutA");
}
// 队列2 B
@Bean(name = "queue-fanoutB")
public Queue queue-fanoutB() {
return new Queue("queue-fanoutB");
}
// 队列3 C
@Bean(name = "queue-fanoutC")
public Queue queue-fanoutC() {
return new Queue("queue-fanoutC");
}

//交换机。类型是 Fanout (fanout_exchange)
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_exchange");
}

//交换机和队列A 绑定
@Bean
Binding bindingExchangeFanoutA(@Qualifier("queue-fanoutA") Queue queue-fanoutA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue-fanoutA).to(fanoutExchange);
}

//交换机和队列B 绑定
@Bean
Binding bindingExchangeFanoutB(@Qualifier("queue-fanoutB") Queue queue-fanoutB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue-fanoutB).to(fanoutExchange);
}
//交换机和队列C 绑定
@Bean
Binding bindingExchangeFanoutC(@Qualifier("queue-fanoutC") Queue queue-fanoutC, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue-fanoutC).to(fanoutExchange);
}

}

4.编写消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class Sender_Fanout {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* RabbitMQ将会忽略第二个参数,把消息分发给所有的队列(每个队列都有消息!)
*
* @param exchangeName 交换机名称
* @param routingKey 发送的routingKey (没有路由Key)
* @param message 内容
*/
public void send(String exchangeName,String routingKey,Message message) {
//⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️⭐️
rabbitTemplate.convertAndSend(exchangeName,null,message);
}
}

5.编写消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Component
public class Receive_Fanout {

//队列A
@RabbitListener(queues="queue-fanoutA")
public void processA(Message message) throws UnsupportedEncodingException {
MessageProperties messageProperties = message.getMessageProperties();

String contentType = messageProperties.getContentType();

System.out.println("Receive-FanoutA:"+new String(message.getBody(), contentType));
}




//队列B

@RabbitListener(queues="queue-fanoutB")
public void processB(Message message) throws UnsupportedEncodingException {
MessageProperties messageProperties = message.getMessageProperties();

String contentType = messageProperties.getContentType();

System.out.println("Receive-FanoutB:"+new String(message.getBody(), contentType));
}




//队列C

@RabbitListener(queues="queue-fanoutC")
public void processC(Message message) throws UnsupportedEncodingException {
MessageProperties messageProperties = message.getMessageProperties();

String contentType = messageProperties.getContentType();

System.out.println("Receive-FanoutC:"+new String(message.getBody(), contentType));
}
}

Tsst

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestRabbitMQ_Fanout {

@Autowired
private Sender_Fanout sender_Fanout;

@Test
public void testRabbit_Fanout() {
/**
* 声明消息 (消息体, 消息属性)
*/
MessageProperties messageProperties = new MessageProperties();
//设置消息是否持久化。Persistent表示持久化,Non-persistent表示不持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
messageProperties.setContentType("UTF-8");

Message message = new Message("hello,rabbit_topic!".getBytes(), messageProperties);

sender_Fanout.send("fanout_exchange","",message);
}
}



1
2
3
4
5
6
// 3 自动绑定队列和交换机
@RabbitListener(bindings = @QueueBinding(value = @Queue("myQueuebingExchange"), exchange = @Exchange("myExchange")
))
public void process(String message) {
log.info("message={}", message);
}

image-20190810194411393

image-20190810194507045